Read .fits raw data¶
For reading the .fits data-set one may use the utils library. From the utils library, we can use the data_helper.Reader() object, that we created to compress the code, and facilitate the reading process. Inside a .fits file, we usually find a sctructure with three tables of type HUD. In this particular case, the three tables teels a history of the data, where the first is the most raw data possible, then teh second is the treated/filtered version of the first table, and the third table
is the compressed version of the information on the second table.
We are interested in the most informative data-set in a minimal (compressed) manner. Provided that, here we do not have the luxury of dealing with big-data problems… for that we might needSparkorHadoopsupport.
Here we will read all the files from a given folder, and label it as disered. As an example my folder structure is the following:
./database
│
├── bright_stars
│ ├── ..._1.fits
│ ├── ..._2.fits
│ ⋮ ⋮
│ └── ..._k.fits
│
├── confirmed_targets
│ ├── ..._1.fits
│ ├── ..._2.fits
│ ⋮ ⋮
│ └── ..._l.fits
│
├── eclipsing_binaries
│ ├── ..._1.fits
│ ├── ..._2.fits
│ ⋮ ⋮
│ └── ..._m.fits
│
└── red_giants
├── ..._1.fits
├── ..._2.fits
⋮ ⋮
└── ..._n.fits
Inside the database/bright_stars we have all .fits files of class bright stars, in database/confirmed_targets we have all .fits files of class confirmed exoplanets, in database/eclipsing binaries we have all .fits files of class confirmed multi transit eclipsing binaries and in database/red_giants we have all .fits files of class red giants.
After reading the folders we concatenate the provided list with the curves compressed light curve information in the curves variable… where we will have a list of data_struc.Curve objects.
[5]:
from utils import *
folder_list = [ './database/raw_fits/confirmed_targets',
'./database/raw_fits/eclipsing_binaries',
'./database/raw_fits/red_giants',
'./database/raw_fits/bright_stars' ]
dread = data_helper.Reader()
curves = dread.from_folder(folder=folder_list[0], label='confirmed targets', index=2)
curves += dread.from_folder(folder=folder_list[1], label='eclipsing binaries', index=2, items=40)
#curves += dread.from_folder(folder=folder_list[2], label='red giants', index=2)
#curves += dread.from_folder(folder=folder_list[3], label='bright stars', index=2)
INFO:reader_log:Reader module created...
INFO:reader_log:Reading 37 curve packages...
INFO:reader_log:Reading 40 curve packages...
Importing the utils as show, one is actually import all the following packages:
data_helperwith Reader Objectdata_strucwith Curve Objectvisualwith several plot functionspreprocwith several data preprocessing rountines
And as an example, the visual package just compress the bokeh code library, since it is pretty expansive code, for example, to make a line plot without visual, one should do something like
from bokeh.palettes import Magma
from bokeh.layouts import column
from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource
from bokeh.io import output_notebook, push_notebook
p = figure(
title="Some title",
plot_width=400,
plot_height=600)
# Style the figure image
p.grid.grid_line_alpha = 0.1
p.xgrid.band_fill_alpha = 0.1
p.xgrid.band_fill_color = Magma[10][1]
p.yaxis.axis_label = "Some label for y axis"
p.xaxis.axis_label = "Some label for x axis"
# Place the information on plot
p.line(x_data, y_data,
legend_label="My legend label",
line_width=2,
color=Magma[10][2],
muted_alpha=0.1,
line_cap='rounded')
p.legend.location = "right_top"
p.legend.click_policy = "disable"
show(p)
and the same can be achieved using visual with:
[21]:
p = visual.line_plot(curves[25]["DATE"],
curves[25]['WHITEFLUXSYS'],
legend_label='Raw Light Curve',
title='Example curve plot',
y_axis={'label': 'Intensity'},
x_axis={'type': 'datetime',
'label': 'Date'})
visual.show_plot(p)
Preprocessing data¶
The preprocessing part of this analysis will include the preparation of the light curve data for each observation and saving both the original data, such as the preprocessed data in a more suitable data format for Python users in the future. The main goal is to read the data from the .fits file (already done with data_helper.Reader()), filter the data to remove dynamical noise, and than save in a suitable format such as .pickle, or .mat for those in favor…
Here is the step where we will filter the data. To do that, first we must choose between the two possible paths
Continuous time filters
Discrete time filters
A practice advise, it is always preferable discrete time filtering, because the routines are simple and more efficient… but this is not always possible, when dealing with several time series. To go with the discrete approach, we must check if all time series has a close mean sample time, with low variance. If not, we must first resample the time series, and then use discrete filtering techniques on the data. Do not try, to apply discrete filtering techniques on analog data (without a consistent sample time), otherwise you will apply a technique that is not a linear transfomation on the data, and therefore, you will be, by hand, introducing noise to the data.
So first, we will check the sample time of the series, and after we will filter the high frequency noise from the data. The usefulness of the filtering of the time series, will be shown in the final of the document, where a feature extraction technique will be applied just as an example on how the denoised time series is so necessary.
Resampling series¶
Sample time distribution¶
To analyse the time sample of the series, one can do a box plot for each time-series curve. At each candle plot it is represented the distribution of the diference t[k] - t[k-1] for k representing each sample of the time-series and t the sampled time.
[6]:
import numpy as np
from datetime import datetime
item = 0
values, labels = [], []
for curve in curves:
diff_time = np.diff(curve["DATE"])
values += [x.total_seconds() / 60 for x in diff_time]
labels += [str(item) for x in diff_time]
item += 1
# ╔══════ Include comment here
# ║ to enhance the outliers
p = visual.box_plot(values, labels, y_range=(8.515, 8.555),
title='Average sample time within curves',
y_axis={'label': 'Sample time (min)'},
x_axis={'label': 'Curves'})
visual.show_plot(p)
Resample time series data¶
Since the time sample time do not vary that much from one light curve to another, not considering the outliers… One can create the sample time as the mean of the median and then resample each time series using this found value.
One also might say that those time series, does not need resampling, since all the series present a pretty close sample time mean. But notice that the above figure has the ``y`` axis clipped from ``[8.515, 8.555]``. If the user comment the line described on the above cell, one will see that there are some worrying outliers.It is because of those outliers that a resampling technique must be applied!
[7]:
import statistics as stt
time_sample = round(stt.mean(values), 2)
print("The time sample is now {} minutes.".format(time_sample))
The time sample is now 8.57 minutes.
Now that we have a feasible approximation of the time sample, it is possible to use this reference to reseample each time series to this single sample rate.
[8]:
import scipy.signal as scs
data = {'y':[], 't':[], 'ti':[], 'lab':[]}
for curve in curves:
# Get the time information
init_time = curve['DATE'][0]
data_vector = curve['WHITEFLUXSYS']
time_vector = [(t - init_time).total_seconds() / 60 for t in curve['DATE']]
# Compute the amount of points
n_points = int(time_vector[-1] // time_sample)
# Compute the resampled time series
res_data, res_time = scs.resample(data_vector, n_points, time_vector)
data['y'].append( res_data )
data['t'].append( res_time )
data['ti'].append(init_time)
data['lab'].append(curve.labeler)
To check if the new time sample was correctly placed and there is no more samples with outlier time samples. One can use the histogram of the time sample variation of all light curves to ensure the consistency of the sample time.
[9]:
import scipy.special as ssp
t_std = [stt.stdev(np.diff(t)) for t in data['t']]
hist, edges = np.histogram(t_std, density=True, bins=3)
p = visual.hist_plot(hist, edges,
title='Sample time consistency',
y_axis={'label': 'Density'},
x_axis={'label': 'Sample time difference (min)'})
visual.show_plot(p)
Filtering series¶
The main goal here is to remove the random signals that are contributing to the information, this actully will clean the time series. This is pretty interesting, because the applied filtering technique will not disturb the meaningfull information of the series, since it will only reduce the random information of the series.
[10]:
index = 5 # Curve index
p = visual.line_plot(data['t'][index],
data['y'][index],
legend_label='Raw Light Curve',
title='Light Curve',
y_axis={'label': 'Intensity'},
x_axis={'type': 'datetime',
'label': 'Date'})
visual.show_plot(p)
But the data is too noisy… To reduce the amount of noise on data we might use the PyAstronomy library that has some interesting smoothing algorithms, such as the hamming filter. We can also change the window size considered for filtering the light curve, here as an example we are using window = 33 samples.
Note that we are correcting the grammar and talking about smothing and not just filtering the data. In smothing techniques, it is spected that the provided data is in a discrete format. Because, actually, a mathematical filtering is applied… not a frequency filter.
[11]:
from PyAstronomy import pyasl
window = 33
algorithm = 'hamming'
sm1 = pyasl.smooth(data['y'][index], window, algorithm)
p = visual.line_plot(data['t'][index],
sm1,
legend_label='Raw Light Curve',
title='Light Curve',
y_axis={'label': 'Intensity'},
x_axis={'label': 'Time index'})
visual.show_plot(p)
Now we are talking!! Without the noise is possible to observe the actual variation of the time series, when the eclipses appear. To see the importance of the filtering process applied above, let’s try to do some feature engineering… and use the data to extract some information of the process.
Application example¶
We know, that the series from database/confirmed_targets are series that highly represents the transit photometry, since it has pronunced eclipses. So let’s try, to use the time series data to create an algorithm to determine the moments of eclipse on the time series.
The main idea is to create a binary label for the eclipse pattern from the light curve. For that, we could check the light curve derivative to analyse the time series variation along time. Usually when you have a high variation, the chances of having a eclipse is bigger. As an example, let’s plot the derivative of one particular light curve, index = 2. So for that, lets plot both the derivative of the non filtered time series, and the filtered one, to see if we can take anything out of these
informations.
[12]:
p = visual.line_plot(data['t'][index][1:],
np.diff(data['y'][index])/time_sample,
legend_label='Derivative of Light Curve',
title='Light Curve Derivative',
color_index=4,
y_axis={'label': 'Intensity'},
x_axis={'type': 'datetime',
'label': 'Date'})
p1 = visual.line_plot(data['t'][index][1:],
np.diff(sm1)/time_sample,
legend_label='Derivative of Light Curve',
title='Light Curve Derivative',
color_index=4,
y_axis={'label': 'Intensity'},
x_axis={'label': 'Time index'})
visual.show_plot(p, p1)
From these graphs it is possible to see, that with a simple threshold selection it is possible to infer the regions where we have the eclipses, in the filtered derivative version.
More practice guide… When dealing with time series, the most simple linear transformation (e.g. a derivative) could enhance the data noise in a very powerfull way. Interesting enough, the noise didn’t seem to be that big on the time series plot before right?! So after appling a simple linear trasnformation, the noise increased to be bigger to the biggest actual value of the data. Note the amplidutes of the highest value on the filtered version, and the not filtered one… in the not filtered version, one can only see noise!! > So imagine if you pass this not filtered data trough a complex neural network, that will apply several non linear transformation to your data… we are talking about CAOS! Even if you think you have interesting results, your algorithm is actually working in a very sensible place, that eventually he will go unstable.
Another topic to realise… the most treatening noise that one can have in a time series is this called white noise. This noise exists in all frequencies and only statistical and mathematical techniques that obey the law of large numbers can deal with it. And for that, you must have discrete time data!
Then, to introduce some statistics perspective to the result, and make it more automatic, the mean and standard deviation of the light curve variation is used to determine possible decision thresholds, that could infer the moments of initialization of the eclipse, and the finalization…
For that, lets compute the mean of the derivative and the standard deviation
[13]:
variation_mean = np.average(np.diff(sm1)/time_sample)
variation_stan = np.std( np.diff(sm1)/time_sample )
print("The varaition signal has a mean around {}".format(round(variation_mean,3)))
print("And a standard deviation around {}".format(round(variation_stan,2)))
The varaition signal has a mean around 0.035
And a standard deviation around 8.39
Therefore, one can create a threshold close to \(\pm \sigma\), \(\pm 2\sigma\) and \(\pm 3\sigma\), as one can see in the next bellow figure.
[14]:
size = data['t'][index][1:].shape[0]
one_std = variation_stan * np.ones((size,))
x_data = data['t'][index][1:]
y_data = [np.diff(sm1) / time_sample,
1*one_std, 2*one_std, 3*one_std, -1*one_std, -2*one_std, -3*one_std]
legends= ['Derivative', '68.0%', '95.0%', '99.7%', '68.0%', '95.0%', '99.7%']
colors = [8, 1, 3, 5, 1, 3, 5]
p = visual.multline_plot(x_data, y_data,
legend_label=legends,
title='Light Curve Derivative',
color_index=colors,
y_axis={'label': 'Intensity'},
x_axis={'label': 'Time index'})
visual.show_plot(p)
Now one can create a listener to check each peak and create the respective eclipse label. First it is necessary to check two states: the first is the state of in_eclipse and the second the out_eclipse. Which will map when the series goes into one eclipse, then out of the eclipse.
[15]:
trigger, out_eclipse = False, True # because it starts out of the eclipse
light_variation = np.diff(sm1) / time_sample
light_variation = light_variation.tolist()
threshold = [-1*variation_stan, 1*variation_stan]
light_state = [False]
size = len(light_variation)
for k in range(size):
if out_eclipse:
if light_variation[k] < threshold[0]:
out_eclipse = False
else:
if (light_variation[k-1] > threshold[1]) \
and (light_variation[k] < threshold[1]):
out_eclipse = True
light_state.append(out_eclipse)
With those results, lets see if we can plot the light curve and highlight the moments where we have a suposed eclipse.
[16]:
in_eclipse = np.ma.masked_where(np.array(light_state), sm1)
out_eclipse = np.ma.masked_where(~np.array(light_state), sm1)
x_data = data['t'][index]
y_data = [in_eclipse, out_eclipse]
legends= ['In eclipse', 'Out eclipse']
colors = [3, 7]
p1 = visual.multline_plot(x_data, y_data,
legend_label=legends,
title='Light Curve Derivative',
color_index=colors,
y_axis={'label': 'Intensity'},
x_axis={'label': 'Time index'})
visual.show_plot(p1, p)
Checkpoint!! One thing that we also need to do, is change the index variable and check each exoplanet curve and see if we could ensure that this algorithm works for most of then. And also with the bright stars and red giants…
After that we are ready to engage on more complex analysis, such as statistical approaches and machine learning techniques!
Notice one interesting thing: using the derivative approach, the steady state information of the light curve is automatically discarded! This is a hell of deal, since it is already a filter that mitigates the influence of low frequency siganls and highlight the effect of high frequency ones!!!! O.o Crazy!! Since we only want to analyse the bahavior of the high descent variation, when there are eclipses, the signal derivation is the approach that most highlight this feature. :)
Generation algorithms¶
Here, the algorithm presented above is implemented for each time serie curve and then we generate a file with the pre-processed data. So for that we need to run the following the next procedure. Since all time series are already resampled, only the procedure of filtering and labelling are necessary to be made for all time series. This is the function, that provided the resampled time series, it returns a the filtered and labeled data:
[17]:
def filter_series(data=None, window=33, algorithm="hamming"):
'''
This is the function used to filter all time series readed
in a batch process. And it could take some time to run...
'''
filtered_curves = {'r':[],'y':[],'t':[],'i':[], 'lab':[]}
for curve, time, init in zip(data['y'], data['t'], data['ti']):
filt_curve = pyasl.smooth(curve, window, algorithm)
filtered_curves['r'].append( curve )
filtered_curves['y'].append( filt_curve )
filtered_curves['t'].append( time )
filtered_curves['i'].append( init )
filtered_curves['lab'] = data['lab']
return filtered_curves
[18]:
filtered_data = filter_series(data=data)
Now that we have the filtered data, we could use the derivative algorithm to label each time series…
[ ]:
def label_series(data=None, time_sample=None, std_num=3):
'''
This is the function used to label the eclipses part of
the time series, using the filtered lgiht curve data in
a batch process. And it could take some time to run...
'''
data['eclipse_labels'] = []
for curve in data['y']:
derivative = np.diff( curve ) / time_sample
mean, stan = np.mean( derivative ), np.std( derivative )
derivative, threshold = derivative - mean, std_num * stan
light_state, out_eclipse = [False], True
for k in range(len(derivative)):
if out_eclipse:
if derivative[k] < - threshold:
out_eclipse = False
else:
if (derivative[k-1] > threshold) \
and (derivative[k] < threshold):
out_eclipse = True
light_state.append( out_eclipse )
data['eclipse_labels'].append( light_state )
return data
[ ]:
filtered_data = label_series(data=filtered_data, time_sample=time_sample)
Know we have some structure data in the filtered_data variable that is actually pretty suitable for Python users, which is composed only by dict, list, array and datetime. And follows this particular structure:
{
'r': [
array(...),
array(...),
⋮
array(...)
],
'y': [
array(...),
array(...),
⋮
array(...)
],
't': [
array(...),
array(...),
⋮
array(...)
],
'i': [
datetime,
datetime,
⋮
datetime
],
'lab': [
str,
str,
⋮
str
],
}
where each key is:
r: The raw intensity of each curvey: The filtered intensity of each curvet: The time index initialized in 0 of each curvei: The time of the first sample of each curvelab: The string with the label of this curve
As an example, to fetch the filtered intesity samples of the third curve, one just:
data = filtered_data['y'][2]
Save pre-processed data¶
Save as .mat file¶
This is just a function that will save the filtered data as a MATLAB data file:
[19]:
import scipy.io as sio
file_path = './filtered.mat'
sio.savemat(file_name=file_path, mdict=filtered_data)
Save as .pickle file¶
This is a fucntion to save the data as a pickle file to save the dictionary:
[20]:
import pickle
file_path = './filtered.pkl'
output = open(file_path, 'wb')
pickle.dump(filtered_data, output)
output.close()